RDD vs. Dataset vs. DataFrame 您所在的位置:网站首页 rdd dataframe dataset转换 RDD vs. Dataset vs. DataFrame

RDD vs. Dataset vs. DataFrame

2024-06-04 08:04| 来源: 网络整理| 查看: 265

主要介绍RDD、Dataset/DataFrame三者之间的转换,和一些使用上的差异。

basicRDD[T]DataFrameDataSet[T]DataFrame vs. DatasetDataFrame取值相互转换Dataset -> DataFrame: toDFDataFrame -> Dataset: as[T]RDD -> DataFrame方法一:使用反射推断schema方法二:指定一个自定义的schema方法总结原因分析 TODODataFrame -> RDD: rddRDD和Dataset/DataFrame在一些方法上的区别RefbasicRDD[T]

出现的早,一般用于非结构化的数据。比如通过SparkContext的sequenceFile方法读取一个sequenceFile,或者parallelize通过一个Seq构建rdd。

rdd没有schema,没有结构。一般rdd转成有结构的DataFrame后只有一列column,名为value。

DataFrame

适用于结构化的数据。比如通过SparkSession的read获得DataFrameReader,再用csv、json、parquet等方法读取相应的数据文件。或者使用format指定一个类型,再load文件,比如读取avro:

1 spark.read.format("com.databricks.spark.avro").load("xxx")

很显然,不管怎么搞,读的都是格式化的数据。要么自带schema(csv、json、avro),要么指定schema(通过一个schema方法schema(schemaString: String),但目前还不太用到)。

另外,DataFrameReader还提供了一个text方法text(paths: String*),返回DataFrame,它的schema以value为column前缀,拼上column名称。但是一般也是就一个value,比如读一个普通纯文本文件:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 scala> spark.read.text("licenses/LICENSE-protobuf.txt") res1: org.apache.spark.sql.DataFrame = [value: string] scala> val df = spark.read.text("licenses/LICENSE-protobuf.txt") df: org.apache.spark.sql.DataFrame = [value: string] scala> df.printSchema root |-- value: string (nullable = true) scala> df.schema res7: org.apache.spark.sql.types.StructType = StructType(StructField(value,StringType,true)) scala> df.show(3, false) +---------------------------------------------------------------------------+ |value | +---------------------------------------------------------------------------+ |This license applies to all parts of Protocol Buffers except the following:| | | | - Atomicops support for generic gcc, located in | +---------------------------------------------------------------------------+ only showing top 3 rows

普通RDD转成的DataFrame也就这样。

DataSet[T]

同DataFrame基本一样,API都合并了。Dataset是所含内容为T的数据集,一般和case class一起用,T就是class的类型。 获取数据后可以直接用T.xxx获取其某个字段的内容。

DataFrame vs. Dataset

DataFrame虽然是结构化的,但是其所含的值并没有对应一个class,所以spark就定义了一个class名为Row,作为DataFrame的数据的数据结构。所以DataFrame等价于Dataset[Row]。

但是Row又没有定义field,具体包含哪些字段,没法直接取出来,所以只能通过Row的各种方法比如getAs[Int](xxx)来获取属性xxx的内容。而Dataset在自定义了case class之后可以很自由的获得每一行的信息。所以DataFrame在获取内部数据的时候,方法数据的属性没有Dataset方便。

DataFrame是Dataset这个泛型的一种具象化:T为Row。类似于List和List的区别。

DataFrame取值

DataFrame获取字段示例:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 scala> df.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala> df.foreach(line => println(line.getAs[String]("name"))) Michael Andy Justin scala> df.foreach(line => println(line.getAs[String]("age"))) null 30 19 scala> df.foreach(line => println(line.getAs[Int]("age"))) null 30 19 scala> df.foreach(line => println(line.getAs[Int]("name"))) Michael Andy Justin 取列:$(xxx)、col(xxx)、your_df(xxx)取行的某一个字段:getAs[T](xxx)相互转换Dataset -> DataFrame: toDF

这个很简单,因为只是把case class封装成Row,相当于抹掉class的属性了:

1 2 3 import spark.implicits._ val dataFrame = dataset.toDF

toDF(): DataFrame

Converts this strongly typed collection of data to generic Dataframe. In contrast to the strongly typed objects that Dataset operations work on, a Dataframe returns generic Row objects that allow fields to be accessed by ordinal or name.

DataFrame -> Dataset: as[T] 1 2 3 4 5 import spark.implicits._ // 定义类的字段名和类型 case class Person(age : Int, name : String) extends Serializable val ds = df.as[Person]

这就要定义一个case class,为每一列对应一个具体类型的属性。然后使用as方法(org.apache.spark.sql.Encoder里的)转换。

as[U](implicit arg0: Encoder[U]): Dataset[U]

Returns a new Dataset where each record has been mapped on to the specified type. The method used to map columns depend on the type of U:

When U is a class, fields for the class will be mapped to columns of the same name (case sensitivity is determined by spark.sql.caseSensitive).When U is a tuple, the columns will be mapped by ordinal (i.e. the first column will be assigned to _1).When U is a primitive type (i.e. String, Int, etc), then the first column of the DataFrame will be used.

一定要导入spark对象(SparkSession实例)的implicits里的隐式转换:import spark.implicits._,因为用了里面的Encoder来进行对象的转换操作。

RDD -> DataFrame

DataFrame转为RDD后的类型是org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]。但有趣的地方在于,想把该类型再转为DataFrame是不行的:error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

RDD转DataFrame,必须要有schema。可通过两种方式搞定schema,两种方法说白了就是要么spark自己推断schema,要么程序员手动指定schema。

方法一:使用反射推断schema

定义一个case class:

The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns.

https://spark.apache.org/docs/latest/sql-getting-started.html#inferring-the-schema-using-reflection

步骤:

一般读到的rdd是一个RDD[String],要先转为RDD[T],T是一个case class;toDF; 1 2 3 4 5 6 7 8 9 // For implicit conversions from RDDs to DataFrames import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF()

一个自己的例子:

1 2 3 4 5 6 7 scala> case class DummyAvro(s: String) defined class DummyAvro scala> avrodf.rdd.map(x => DummyAvro(x.toString)).toDF.show(2, false) +--------------------------+ |s | +--------------------------+

此时的column name是case class的属性。

当然,把Row搞成基本类型,比如String(使用Row的toString方法)也是可以的:

1 2 3 4 scala> avrodf.rdd.map(x => x.toString).toDF.show(2, false) +--------------------------+ |value | +--------------------------+

此时df的column只有一个:value。

或者直接搞一个Tuple:

1 2 3 val whodf = whoami.map{ map => (map.getOrElse("guid", ""), map.getOrElse("action", ""), map.getOrElse("unit", ""), map.getOrElse("type", ""), map.getOrElse("date", ""), map.getOrElse("keyfrom", "")) }.toDF("guid", "action", "unit", "type", "date", "keyfrom")

另外,如果将一个df转为另一个df,后者也是需要Encoder的。比如这里给Dataset[Map[K, V]]定义了一个Encoder:

1 2 3 4 5 6 7 8 // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19))

Dataset对map的定义:

1 2 3 4 5 def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] (Java-specific) Returns a new Dataset that contains the result of applying func to each element. def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] (Scala-specific) Returns a new Dataset that contains the result of applying func to each element.

Encoder是必须的,只不过是显式还是implicit调用的问题。

另外Row的getValuesMap方法定义如下:

1 2 def getValuesMap[T](fieldNames: Seq[String]): Map[String, T] Returns a Map consisting of names and values for the requested fieldNames For primitive types if value is null it returns 'zero value' specific for primitive ie. 0 for Int - use isNullAt to ensure that value is not null 方法二:指定一个自定义的schema一般读到的rdd是一个RDD[String],要先转为RDD[Row];创建一个匹配Row结构的(StructType的)schema;转换的时候指定该schema。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import org.apache.spark.sql.Row import org.apache.spark.sql.types._ // RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) // 创建schema val schema = StructType(fields) // RDD[String]转换为RDD[Row]:Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // 应用schema,RDD[ROW]转为DF val peopleDF = spark.createDataFrame(rowRDD, schema) https://spark.apache.org/docs/latest/sql-getting-started.html#programmatically-specifying-the-schema方法总结

结合以上两种转换方法,可总结如下:

RDD[String]转RDD[T]或RDD[Row];如果是case class RDD,直接toDF就好了,会自动推断schema。如果是RDD[Row],Row又不是基本类型,使用SparkSession#createDataFrame(RDD, StructType)手动指定个schema;

DataFrame#schema和它的Row#schema是同一个schema。

原因分析 TODO

因为对scala的implicit暂时还不是很了解,所以先盲猜一下:

RDD toDF实际是使用DatasetHolder的toDF。DatasetHolder本身就hold一个Dataset。也就是说,RDD调用toDF之前其实已经可以转为Dataset了。使用的是SparkSession的implicits,它继承了SQLImplicits类:A collection of implicit methods for converting common Scala objects into [[Dataset]]s.。所以它就是将scala对象转为Dataset的!它里面有一堆Encoder,比如StringEncoder。rddToDatasetHolder方法使用相应的Encoder将RDD转为Dataset。

在这些Encode的接口org.apache.spark.sql.Encoder的doc中,有这样的描述:

1 2 3 4 5 6 7 8 9 * == Scala == * Encoders are generally created automatically through implicits from a `SparkSession`, or can be * explicitly created by calling static methods on [[Encoders]]. * * * import spark.implicits._ * * val ds = Seq(1, 2, 3).toDS() // implicitly provided (spark.implicits.newIntEncoder) *

Java不能用隐式转化,所以就很清晰:

1 2 3 4 5 6 7 * == Java == * Encoders are specified by calling static methods on [[Encoders]]. * * * List data = Arrays.asList("abc", "abc", "xyz"); * Dataset ds = context.createDataset(data, Encoders.STRING()); *

当然,熟悉之后你也可以说比scala更麻烦……

DataFrame -> RDD: rdd

直接调用rdd方法即可,返回RDD[T]。

1 2 scala> avrodf.rdd res6: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[15] at rdd at :26

rdd: RDD[T]

Represents the content of the Dataset as an RDD of T.

DataFrame转为RDD后的类型是org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]。

DataFrame虽然转成了RDD,但是取出后的对象T还是Row类型,所以它依然有结构:

1 2 scala> avrodf.rdd.first.schema res27: org.apache.spark.sql.types.StructType = StructType(StructField(guid,StringType,true), StructField(abtest,StringType,true), StructField(advertising,StringType,true), StructField(alg_id,LongType,true), StructField(apps,ArrayType(StringType,true),true), StructField(course_category_id,StringType,true), StructField(course_sub_category_ids,StringType,true), StructField(date,StringType,true), StructField(dict_role,StringType,true), StructField(dict_state,StringType,true), StructField(dict_state_pred,StringType,true), StructField(dict_tags,ArrayType(StringType,true),true), StructField(end,StringType,true), StructField(flTag,StringType,true), StructField(image,StringType,true), StructField(imei,StringType,true), StructField(infoid,StringType,true), StructField(ip,StringType,true), StructF...

依然可以用getAs[T](xx)取字段。

也就是说RDD也可以存储结构化数据!甚至存有case class的对象也可以。如果把数据取出来,就是case class对应的对象了。使用RDD存储结构化数据不方便的地方大概是RDD中没有Dataset中的select、show等可以直接操作结构化对象的方法,因为它没有为结构化数据设计这些方法。

RDD和Dataset/DataFrame在一些方法上的区别

RDD除了不具备select等结构化数据DataFrame才有的方法,其他一些名称相同的方法其实也是有差异的。

比如groupByKey:

rdd中,groupByKey操作的是类型为Tuple2的PairRDD,没有参数,直接将同key的value聚合起来,返回RDD[(K, Iterable[V])];Dataset中,数据都是表,一列一列的,而非Tuple2。groupByKey[K](func: (T) ⇒ K)(implicit arg0: Encoder[K]): KeyValueGroupedDataset[K, T],方法传入一个map函数,根据Dataset的数据类型T产生一个K类型的key(自造一个key),然后返回一个K,T pair的Dataset。这个Dataset的类型是KeyValueGroupedDataset[K, T]。接下来才可以对它做一些按key操作的行为(而PairRDD一开始就可以根据key作出group by的行为),但是鉴于这个grouped dataset已经是按照key group过的,所以不再有类似groupByKey的操作,有一些其他的比如mapGroups[U](f: (K, Iterator[V]) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U],将(K, Iterator V)映射为U。。

一个例子:

1 2 3 4 5 6 7 my_df.groupByKey(x => x.getAs[String]("pv_device_id")) .mapGroups( (k, iter) => (k, iter.map(x => x.getAs[String]("package_name")).toSeq.toSet.toSeq) ) .toDF("deviceId", "packageNames") .sort(size($"packageNames").desc) .show(1000, false) 先自造一个key:pv_device_id;再将pv_device_id和T的pair映射为pv_device_id和package_name的pair。Refdatabricks的文章:https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html中文翻译版:https://zhuanlan.zhihu.com/p/35440915spark SQL的介绍:https://spark.apache.org/docs/latest/sql-getting-started.html#interoperating-with-rdds

总体来说,从RDD到DataFrame到Dataset,数据越来越结构化,类型越来越强。强类型需要严格的语法,同时会带来非常大的好处:编译时错误检查。

举例而言:

RDD基本不存在操作列的方法,这是结构化的数据抽象DataFrame/Dataset才有的。RDD通过map去操作value,具体操作的对不对得运行时才知道;如果在DataFrame中调用了API之外的函数时,编译器就可以发现这个错。不过,如果你使用了一个不存在的字段名字,那就要到运行时才能发现错误了;Dataset API都是用lambda函数和JVM类型对象表示的,所有不匹配的类型参数都可以在编译时发现。而且在使用Dataset时,分析错误也会在编译时被发现,这样就节省了开发者的时间和代价。


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有